package org.budo.warehouse.web.controller;

import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.annotation.Resource;

import org.budo.support.javax.sql.util.JdbcUtil;
import org.budo.support.javax.sql.util.ResultSetUtil;
import org.budo.support.lang.util.ListUtil;
import org.budo.support.lang.util.MapUtil;
import org.budo.support.lang.util.NumberUtil;
import org.budo.support.lang.util.ReflectUtil;
import org.budo.support.spring.util.SpringUtil;
import org.budo.time.Time;
import org.budo.time.TimePoint;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
import com.alibaba.otter.canal.protocol.position.LogPosition;

import lombok.extern.slf4j.Slf4j;

/**
 * @author lmw
 */
@Slf4j
@Controller
public class LogPositionController {
    private static final String _FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";

    @Resource
    private ApplicationContext applicationContext;

    @ResponseBody
    @RequestMapping("log_position")
    public String logPosition() {
        List<Map<String, Object>> list = this.listLogPosition();

        TimePoint now = Time.now();

        String html = "<table border=\"1\">";

        html += "<tr>";
        html += "<td colspan=\"" + 3 + "\"></td>";
        html += "<td colspan=\"" + 4 + "\"></td>";
        html += "<td colspan=\"" + 3 + "\">" + now.toString(_FORMAT) + "</td>";
        html += "</tr>";
        html += "<tr>";

        html += "<td>DESTINATION</td>";
        html += "<td>MASTER</td>";
        html += "<td>POSITION</td>";
        html += "<td>SERVER_ID</td>";
        html += "<td>SLAVE_ID</td>";
        html += "<td>JOURNAL</td>";
        html += "<td>POSITION</td>";
        html += "<td>TIMESTAMP</td>";
        html += "<td>DIFF POSITION</td>";
        html += "<td>DIFF TIME</td>";
        html += "</tr>";

        for (Map<String, Object> map : list) {
            Object destination = map.get("destination");
            Object serverId = map.get("serverId");
            Object slaveId = map.get("slaveId");

            Map<String, Object> masterStatus = (Map<String, Object>) map.get("masterStatus");

            Long masterPosition = NumberUtil.toLong(masterStatus.get("Position"));

            html += "<tr>";
            html += "<td>" + destination + "</td>";
            html += "<td>" + masterStatus.get("File") + "</td>";
            html += "<td>" + masterPosition + "</td>";
            html += "<td>" + serverId + "</td>";
            html += "<td>" + slaveId + "</td>";

            LogPosition slavePosition = (LogPosition) map.get("slavePosition");

            if (null == slavePosition) {
                html += "<td colspan=\"5\"></td>";
            } else {
                EntryPosition postion = slavePosition.getPostion();

                html += "<td>" + postion.getJournalName() + "</td>";
                html += "<td>" + postion.getPosition() + "</td>";

                TimePoint slaveTime = Time.when(postion.getTimestamp());
                html += "<td>" + slaveTime.toString(_FORMAT) + "</td>";

                html += "<td>" + (masterPosition - postion.getPosition()) + "</td>";
                html += "<td>" + now.between(slaveTime).toMilliSeconds() + "</td>";
            }

            html += "</tr>";
        }

        html += "</table>";
        return html;
    }

    @ResponseBody
    @RequestMapping("log_position.json")
    public String logPositionJson() {
        List<Map<String, Object>> list = this.listLogPosition();

        return JSON.toJSONString(list);
    }

    private List<Map<String, Object>> listLogPosition() {
        List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();

        List<String> canalInstanceNames = SpringUtil.getBeanNamesForType(this.applicationContext, CanalInstance.class);
        for (String canalInstanceName : canalInstanceNames) {
            CanalInstance canalInstance = (CanalInstance) SpringUtil.getBean(this.applicationContext, canalInstanceName);
            if (null == canalInstance) {
                log.error("#130 canalInstance=" + canalInstance + ", canalInstanceName=" + canalInstanceName);
                continue;
            }

            MysqlEventParser eventParser = (MysqlEventParser) canalInstance.getEventParser();
            AuthenticationInfo masterInfo = (AuthenticationInfo) ReflectUtil.getFieldValue(MysqlEventParser.class, "masterInfo", eventParser);

            Map<String, Object> masterStatus = this.masterStatus(masterInfo);
            Object slaveId = ReflectUtil.getFieldValue(MysqlEventParser.class, "slaveId", eventParser);

            InetSocketAddress masterAddress = masterInfo.getAddress();

            Map<String, Object> map = MapUtil.stringObjectMap( //
                    "destination", canalInstance.getDestination(), //
                    "serverId", eventParser.getServerId(), //
                    "slaveId", slaveId, //
                    "masterAddress", masterAddress, //
                    "masterUsername", masterInfo.getUsername(), //
                    "masterStatus", masterStatus);

            //
            CanalLogPositionManager logPositionManager = eventParser.getLogPositionManager();
            if (!logPositionManager.isStart()) {
                log.error("#144 logPositionManager not started, logPositionManager=" + logPositionManager + ", eventParser=" + eventParser //
                        + ", canalInstance=" + canalInstanceName + ", masterInfo=" + masterInfo + ", masterAddress=" + masterAddress);
                continue;
            }

            String destination = canalInstance.getDestination();
            LogPosition slavePosition = logPositionManager.getLatestIndexBy(destination);

            map.put("slavePosition", slavePosition);

            //
            list.add(map);
        }
        return list;
    }

    private Map<String, Object> masterStatus(AuthenticationInfo masterInfo) {
        log.info("#156 masterInfo.address=" + masterInfo.getAddress());

        String url = "jdbc:mysql://" + masterInfo.getAddress().getHostName() + ":" + masterInfo.getAddress().getPort();

        Connection connection = JdbcUtil.getConnection(url, masterInfo.getUsername(), masterInfo.getPassword());
        PreparedStatement preparedStatement = JdbcUtil.prepareStatement(connection, "SHOW MASTER STATUS");
        ResultSet resultSet = JdbcUtil.executeQuery(preparedStatement);

        List<Map<String, Object>> masterStatus = ResultSetUtil.toMapList(resultSet, false);

        JdbcUtil.close(resultSet);
        JdbcUtil.close(preparedStatement);
        JdbcUtil.close(connection);

        return ListUtil.first(masterStatus);
    }
}